1 Star 10 Fork 2

dpwgc / kapokmq

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README
Apache-2.0

KapokMQ

基于Go整合Gossip+WebSocket的轻量级分布式消息队列

Go Go github star fork


KapokMQ是一个使用Go语言实现的,单机TPS超10万的轻量级分布式消息队列

特点:

  • 部署KapokMQ无需安装任何外部依赖应用/环境,单文件应用,且配置简单,启动方便。

  • 支持多种消息推送方式(订阅/发布推送、点对点推送、延时消息发布)。

  • 消息队列与客户端采用WebSocket连接,全双工通讯,消息与ACK发送都在一条连接上,简易高效。

  • 集群基于Gossip构建,可自动探测集群节点,能轻易做到水平扩展、断线重连。

  • 具有主从节点消息同步功能,主节点宕机后,从节点可以自动接手消息推送工作。

  • 提供全量数据持久化方案及WAL预写日志记录,用户可在高性能与高可靠之间自由选择。

  • 内置网页端控制台,可直接通过网页查看消息队列的运行情况。


KapokMQ与Serena应用整合包下载与安装

注册中心源码 ~ Serena

Golang客户端 ~ kapokmq-go-client

控制台前端源码 ~ kapokmq-console


软件架构

avatar avatar


实现功能

订阅/发布推送模式:

  • 将单条消息通过WebSocket结合协程并发推送到多个与消息topic主题相同的消费者客户端。

点对点推送模式:

  • 如果有多个消费者客户端连接到消息队列,将消息随机推送给其中一个与消息topic主题相同的客户端。

延时消息发布:

  • 可对单条消息设定延时时间,秒级延时推送消息,投送时间精确度受mq.checkSpeed消息检查速度的影响。

ACK消息确认机制:

  • 消息队列接收到消息后,将向生产者发送确认接收ACK,可确保消息不在写入WAL日志之前丢失。
  • 消费者接收到消息后,将向消息队列发送确认消费ACK,可确保消息不在消息消费环节丢失。

avatar

主从节点部署

  • 可为单个消息队列节点绑定一个从节点,从节点消息列表会与主节点消息列表保持同步。
  • 主节点健在时,从节点不会进行消息推送,当主节点宕机后,从节点会把剩余的未消费消息推送给备用消费者客户端。

avatar

负载均衡集群部署:

  • 采用Gossip协议连接与同步集群节点,生产者客户端从注册中心获取所有消息队列节点地址并与它们连接,进行负载均衡投递(将消息随机投送到其中一个消息队列节点)。可做到不停机水平扩展。

消息推送失败自动重试机制:

  • 定期重推未确认消费且超时的消息(受mq.checkSpeed消息检查速度的影响)。

KV型内存数据存储:

  • 采用sync.Map存储所有消息,控制台访问、消息检查、ACK确认接收/消费、全量数据持久化等一系列读写操作都在sync.Map上进行。

avatar

数据持久化:

  • 方式一:周期性全量数据持久化。
  • 方式二:周期性全量数据持久化结合WAL预写日志(影响性能)。
  • (注:周期性全量数据持久化不会写入已经被消费的消息)

avatar

消息清理:

  • 自定义过期时间,定期清除过期消息,默认清除两天前的消息。
  • 通过设置可让消息队列立即清除已确认消费的消息。

网页端控制台:

  • 包含查看消息队列配置、生成近一周消息增长折线图、查看各状态消息数量、查看消费者与生产者客户端列表、搜索消息及查看集群节点列表功能。

avatar avatar avatar avatar avatar avatar avatar


单机性能测试

测试程序和消息队列都运行在本机。

配置:

  • 2019年的轻薄本,未充电状态,平衡模式

  • 4核8线程处理器:Intel(R) Core(TM) i5-8265U CPU @ 1.60GHz 1.80 GHz

  • LPDDR3内存:8.00 GB (7.85 GB 可用)

  • 固态硬盘:WDC PC SN720 SDAPNTW-512G

测试程序(部分)

avatar

选择数据持久化方式一,未开启WAL预写日志的情况下,模拟三十万并发消息插入,最高TPS可达十万。

avatar

选择数据持久化方式二,开启WAL预写日志,同样模拟三十万并发消息插入,受磁盘IO影响,消息插入速度存在大幅波动与衰减。

avatar


配置文件

  • ./application.yaml

打包方法

安装并配置go环境
应用打包:
  • 使用build.bat一键打包
Windows系统下直接运行./build.bat文件
自动执行打包命令
生成Linux二进制文件与Windows exe文件
  • 打包成windows exe
  GoLand终端cd到项目根目录,执行go build命令,生成exe文件
  • 打包成linux二进制文件
  cmd终端cd到项目根目录,依次执行下列命令:
  SET CGO_ENABLED=0
  SET GOOS=linux
  SET GOARCH=amd64
  go build
  生成二进制执行文件

部署方式

在服务器上部署

详细部署流程
在Windows上部署

/kapokmq                  # 文件根目录
    kapokmq.exe           # 打包后的exe文件
    application.yaml      # 配置文件
    /log                  # 日志目录
    /view                 # 前端-Vue项目打包文件
    MQDATA                # 持久化文件
    WAL.log               # 预写日志  
在Linux上部署

/kapokmq                  # 文件根目录
    kapokmq               # 打包后的二进制文件(后台运行指令:setsid ./kapokmq)
    application.yaml      # 配置文件
    /log                  # 日志目录
    /view                 # 前端-Vue项目打包文件
    MQDATA                # 持久化文件
    WAL.log               # 预写日志  
  • 控制台网页端,启动消息队列后访问:
http://localhost:port/#/Console

消息格式说明

  • 消息模板
MessageCode  string   #消息唯一标识码(由消息队列生成)
MessageData  string   #消息内容(一般为JSON格式的字符串)
Topic        string   #消息所属主题
CreateTime   int64    #消息创建时间(秒级时间戳)
ConsumedTime int64    #消息被消费时间(秒级时间戳)
DelayTime    int64    #延迟推送时间(单位:秒)
Status       int      #消息状态(-1:待消费。0:未到推送时间的延时消息。1:已消费)
  • 消息状态变更流程

avatar


客户端连接

Golang客户端连接工具

生产者客户端连接到消息队列

  • WebSocket ws://localhost:port/Producers/Conn/{topic}/{producerId}
WebSocket链接中的参数:
topic        //主题名称
ProducerId   //生产者客户端Id
  • 消息队列接收的消息格式

  • 生产者客户端推送给消息队列的Json字符串消息格式

{
    "MessageData":"hello",
    "DelayTime":0
}
  • 常规消息进入消息通道之前,状态将被设为待消费(Status:-1)。

  • 延时消息进入消息通道之前,状态将被设为未到时(Status:0)。

  • 消息队列接收到该消息后(写入日志后),通过该websocket连接向生产者客户端发送ACK,ACK内容为字符串"ok"

  • 如果生产者客户端选择异步发送消息方式,则可忽略该ACK。

  • 如果要追求消息的可靠性,可以利用该ACK机制发送同步消息,即生产者在发送完一条消息后,等待消息队列发来的ACK后,再继续发送下一条消息。

  • 消息队列发送给生产者的ACK字符串样式

"ok"

消费者客户端连接到消息队列

  • WebSocket ws://localhost:port/Consumers/Conn/{topic}/{consumerId}
WebSocket链接中的参数:
topic        //主题名称
consumerId   //消费者客户端Id
  • 通过WriteJSON()函数将model.Message类型的消息转为Json字符串发送

  • 消息队列推送给消费者客户端的Json字符串消息格式

{
    "MessageCode":"8c01b728ef82ba754a63e61daa43e83c61b744c7",
    "MessageData":"hello",
    "Topic":"test_topic",
    "CreateTime":1640975470,
    "ConsumedTime":1640975520,
    "DelayTime":0,
    "Status":-1
}
  • 消费者客户端接收并处理完该消息后,通过该websocket连接向消息队列异步发送ACK,ACK内容为消息的唯一标识码MessageCode

  • 消息队列接收到ACK后,将指定消息的状态更改为已消费(Status:1)。

  • 如果消息到达超时时间(mq.pushRetryTime,默认为300秒)仍未收到ACK,将进行重推。

  • 消费者客户端发送给消息队列的ACK字符串样式

"8c01b728ef82ba754a63e61daa43e83c61b744c7"

客户端连接流程演示

生产者、消费者客户端与消息队列建立连接后,需输入密钥登录

生产者客户端与消息队列建立连接

ws://127.0.0.1:8011/Producers/Conn/test_topic/1

服务端回应 2022-01-02 15:14:53
"Please enter the secret key"   //提示输入密钥

客户端发送 2022-01-02 15:15:06
"qqq"                           //输入错误的密钥

服务端回应 2022-01-02 15:15:06
"Secret key matching error"     //提示密钥出错

服务端回应 2022-01-02 15:15:06
"Please enter the secret key"   //再次提示输入密钥

客户端发送 2022-01-02 15:15:13
"test"                          //输出正确的密钥

服务端回应 2022-01-02 15:15:13
"Secret key matching succeeded" //提示密钥验证成功

客户端发送 2022-01-02 15:15:15   //生产者客户端可以向消息队列发送消息
"{.. Json SendMessage ..}"

服务端回应 2022-01-02 15:15:15   //消息队列接收到消息后,向生产者发送ACK
"ok"                           //ACK内容为字符串"ok"

avatar

消费者客户端与消息队列建立连接

ws://127.0.0.1:8011/Consumers/Conn/test_topic/1

服务端回应 2022-01-02 15:14:53
"Please enter the secret key"   //提示输入密钥

客户端发送 2022-01-02 15:15:06
"qqq"                           //输入错误的密钥

服务端回应 2022-01-02 15:15:06
"Secret key matching error"     //提示密钥出错

服务端回应 2022-01-02 15:15:06
"Please enter the secret key"   //再次提示输入密钥

客户端发送 2022-01-02 15:15:13
"test"                          //输出正确的密钥

服务端回应 2022-01-02 15:15:13
"Secret key matching succeeded" //提示密钥验证成功

服务端回应 2022-01-02 15:15:13   //消息队列可以向消费者客户端发送消息
"{.. Json Message ..}"
"{.. Json Message ..}"

客户端发送 2022-01-02 15:15:14   //消费者接收到消息后,向消息队列发送ACK
"8c01b728ef82ba754a63e61daa43e83c61b744c7"  //ACK内容为MessageCode
"sdiw2b7quh82basdsa17sdqdqw81d83c61bqdhhu"  //可异步发送确认消费ACK

avatar


主要模块

消息通道与消息列表 memory/mq.go
  • 使用golang的通道chan充当队列,所有主题的消息都将进入该通道,通道的缓冲空间大小决定了消息队列的吞吐量。

  • 使用sync.Map存储所有消息,用于数据持久化、消息检查、控制台数据获取。

//消息通道,用于存放待消费的消息(有缓冲区)
var messageChan = make(chan models.Message, messageChanBuffer)

// MessageList 消息列表,存放所有消息记录
var MessageList sync.Map

avatar

生产者消息接收 server/producer.go
  • 生产者客户端通过WebSocket连接到消息队列(github.com/gorilla/websocket),并发送消息到消息队列,消息被写入消息通道与消息列表。

  • 额外提供生产者HTTP接口,可通过HTTP请求向消息队列发送消息。

接口URL:

http://127.0.0.1:8011/Producer/Send

请求方式

POST

Content-Type

form-data

请求Header参数

参数 示例值 是否必填 参数描述
secretKey test 必填 访问密钥

请求Body参数

参数 示例值 是否必填 参数描述
messageData hello 必填 消息主体内容
topic test_topic 必填 消息所属主题
delayTime 0 必填 延时投送时间

成功响应示例

{
    "code":0, 
    "msg":"cbcebdfc446e237af323098fd125c5b161b7516c"
}
消费者消息推送 server/consumer.go
  • 消费者客户端通过WebSocket连接到消息队列(github.com/gorilla/websocket)。

  • 包含订阅/发布、点对点两种推送模式。

  • 消费者客户端接收消息后,将向消息队列发送一条ACK确认字符(内容为消息标识码messageCode),消息队列再根据此ACK将指定messageCode的消息更改为已消费状态。

数据持久化 persistent
  • 全量数据写入:定期将MessageList消息列表中的未消费消息和延时消息转换为[]byte类型数据,并写入二进制文件,类似于Redis RDB持久化方式。

  • 全量数据写入结合WAL日志:定期将内存中的消息全量持久化到二进制文件(不包括已被消费的消息),在两次全量数据持久化之间,每次接收或更新消息操作都将写入WAL日志,最大程度避免消息丢失。

  • 数据恢复:从二进制文件及WAL日志中读取数据,并将数据恢复至MessageList消息列表中,重新推送未消费的消息。

avatar

消息检查 server/check.go
  • 每隔一段时间遍历一次MessageList消息列表,检查其中是否有到达推送时间的延时消息、超时未消费的消息、过期消息。可重新推送消息及清除过期的消息。
主从同步 syncConn
  • 主从节点之间通过websocket连接同步消息。需先启动主节点,再启动从节点。

  • 主从节点连接后,从节点会定期对主节点进行心跳探测,如果检测到主节点宕机,从节点会自动开始推送消息给备用消费者客户端,当主节点重连后,从节点将重新关闭推送功能。

加入Gossip集群 cluster/join.go
  • 使用 github.com/hashicorp/memberlist 构建并链接Gossip集群服务。

  • 借助Gossip协议扩散同步的特性,可以随时向集群中添加新的消息队列节点。

控制台接口 console/api.go
  • 控制台接口:用于获取生产者/消费者客户端列表、消息队列配置信息及集群内消息队列节点列表,对消息进行查询或删除操作。

项目结构

cluster 集群相关
  • join.go 加入指定集群
config 配置类
  • config.go 项目配置文件加载
console 控制台
  • api.go 控制台接口
memory 内存数据容器
  • mq.go 消息通道与消息列表
middleware 中间件
  • cors.go 跨域配置

  • safe.go 安全验证

model 模板类
  • client.go 客户端模板

  • message.go 消息模板

  • node.go 集群节点模板

mqLog 日志记录
  • log.go 常规日志与WAL日志写入
persistent 持久化
  • fileRW.go 文件读写

  • persData.go 持久化到硬盘

  • recovery.go 数据恢复

router 路由
  • router.go 路由配置
server 服务层
  • producer.go 生产者消息接收

  • consumer.go 消费者消息推送

  • check.go 消息检查-消息重推与过期消息清理

syncConn 主从同步
  • master.go 主节点向从节点发送消息

  • slave.go 从节点接收消息

  • sync.go 主从同步初始化

utils 工具类
  • createCode.go 消息标识码生成

  • localTime.go 获取本地时间

  • md5Sign.go md5加密

  • toTimestamp.go 日期字符串转时间戳

view 前端Vue项目打包文件
  • css

  • js

  • index.html

application.yaml 配置文件
main.go 主函数

后期计划

实现功能 功能说明 当前进度
Java客户端 Maven包,websocket连接,Demo:https://gitee.com/dpwgc/kapokmq-java-client 未完成
拉模式消费 消费者主动拉取消息队列的消息 计划中
Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION 1. Definitions. "License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document. "Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License. "Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity. "You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License. "Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files. "Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types. "Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below). "Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof. "Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution." "Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work. 2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form. 3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed. 4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions: (a) You must give any other recipients of the Work or Derivative Works a copy of this License; and (b) You must cause any modified files to carry prominent notices stating that You changed the files; and (c) You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and (d) If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License. 5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions. 6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file. 7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License. 8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages. 9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. END OF TERMS AND CONDITIONS APPENDIX: How to apply the Apache License to your work. To apply the Apache License to your work, attach the following boilerplate notice, with the fields enclosed by brackets "[]" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives. Copyright [yyyy] [name of copyright owner] Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

简介

基于Go整合Gossip+WebSocket的轻量级分布式消息中间件。实现功能:订阅/发布推送、点对点推送、延时消息推送、消费失败自动重试、数据持久化、WAL预写日志、主从节点部署、集群负载均衡、ACK确认机制、内置网页端控制台 展开 收起
Go 等 3 种语言
Apache-2.0
取消

发行版 (1)

全部

贡献者

全部

近期动态

加载更多
不能加载更多了
Go
1
https://gitee.com/dpwgc/kapokmq.git
git@gitee.com:dpwgc/kapokmq.git
dpwgc
kapokmq
kapokmq
master

搜索帮助